package discover

import (
	"context"
	"encoding/base64"
	"fmt"
	"gitee.com/zhancaihua/goyt/core/logyt"
	"google.golang.org/grpc/codes"
	gresolver "google.golang.org/grpc/resolver"
	"google.golang.org/grpc/status"
	"strings"
	"sync"
	"time"
)

type SubOption func(sub *Subscriber)
type Subscriber struct {
}

func NewEtcdSubscriber() *Subscriber {
	return &Subscriber{}
}
func authorityToEndpoints(auth string) ([]string, error) {
	re, err := base64.StdEncoding.DecodeString(auth)
	if nil != err {
		return nil, fmt.Errorf("resolver: failed to decode Target host: %s", err)
	}
	return strings.Split(string(re), endpointsSeparator), nil
}
func endpointsToAuthorityTo(endpoints []string) string {
	return base64.StdEncoding.EncodeToString([]byte(strings.Join(endpoints, endpointsSeparator)))
}

func (w *Subscriber) Build(target gresolver.Target, cc gresolver.ClientConn, opts gresolver.BuildOptions) (gresolver.Resolver, error) {
	key := target.Endpoint()
	auth := target.URL.Host
	if len(key) < 1 {
		//Build 应该返回一个grpc错误，以便客户端处理
		return nil, status.Error(codes.InvalidArgument, "resolver: invalid endpoint key is empty")
	}
	if len(auth) < 1 {
		return nil, status.Error(codes.InvalidArgument, "resolver: invalid endpoint hosts is empty")
	}
	hosts, err := authorityToEndpoints(target.URL.Host)
	if nil != err {
		return nil, status.Errorf(codes.InvalidArgument, "resolver: fail to parse endpoint hosts %s", err)
	}
	cli, err := GetRegistry().GetConn(hosts)
	if err != nil {
		return nil, status.Errorf(codes.Internal, "resolver: fail to establish endpoint conn %s", err)
	}

	r := &resolver{
		//存下来，以便重新发起使用
		target:    key,
		cc:        cc,
		endpoints: hosts,
	}
	r.ctx, r.cancel = context.WithCancel(context.Background())

	em, err := NewManager(cli, r.target)
	if err != nil {
		return nil, status.Errorf(codes.Internal, "resolver: failed to new endpoint manager: %s", err)
	}
	r.wch, err = em.NewWatchChannel(r.ctx)
	if err != nil {
		return nil, status.Errorf(codes.Internal, "resolver: failed to new watch channer: %s", err)
	}

	r.wg.Add(1)
	go r.watch()
	return r, nil
}

func (w *Subscriber) Scheme() string {
	return "etcd"
}

type resolver struct {
	cc     gresolver.ClientConn
	wch    WatchChannel
	ctx    context.Context
	cancel context.CancelFunc
	wg     sync.WaitGroup
	//从哪个集群获得数据呢，可以从Authority获得.缓存用
	endpoints []string
	//监听那个命名空间.缓存用
	target string
}

func (r *resolver) doKeepalive() error {
	ticker := time.NewTicker(time.Second)
	defer ticker.Stop()
	for range ticker.C {
		select {
		//如果已经关闭了就没必要重新发起了
		case <-r.ctx.Done():
			return nil
		default:
			//重新获取连接[注册中心宕机重启]并创建wch。ctx和cancel不能更新因为到时候还要为他们做同步
			cli, err := GetRegistry().GetConn(r.endpoints)
			if err != nil {
				logyt.WarnLog("resolver: fail to establish endpoint conn %s", logyt.LogField{
					Key:   "error",
					Value: err,
				})
				break
			}
			em, err := NewManager(cli, r.target)
			if err != nil {
				logyt.WarnLog("resolver: failed to new endpoint manager: %s", logyt.LogField{
					Key:   "error",
					Value: err,
				})
			}
			r.wch, err = em.NewWatchChannel(r.ctx)
			if err != nil {
				logyt.WarnLog("resolver: failed to new watch channer: %s", logyt.LogField{
					Key:   "error",
					Value: err,
				})
			}

			r.wg.Add(1) //重新发起计数加1
			go r.watch()
			return nil
		}
	}

	return nil
}
func (r *resolver) watch() {
	defer r.wg.Done()

	allUps := make(map[string]*UpdateOp)
	for {
		select {
		//如果已经关闭了,协程退出
		case <-r.ctx.Done():
			return
		case ups, ok := <-r.wch:
			//一般只有watch的stream意外退出或者etcd服务挂了才会导致此通道关闭
			if !ok {
				// 重新发起
				if err := r.doKeepalive(); nil != err {
					logyt.WarnLog(fmt.Sprintf("resolver fail to doKeepalive %s", err))
				}
				return //退出当前协程，计数-1
			}

			for _, up := range ups {
				switch up.Op {
				case Put:
					allUps[up.Key] = up
				case Del:
					delete(allUps, up.Key)
				}
			}

			addrs := convertToGRPCAddress(allUps)
			if err := r.cc.UpdateState(gresolver.State{Addresses: addrs}); nil != err {
				logyt.WarnLog("etcd UpdateState failed", logyt.LogField{
					Key:   "error",
					Value: err,
				})
			}
		}
	}
}

func convertToGRPCAddress(ups map[string]*UpdateOp) []gresolver.Address {
	var addrs []gresolver.Address
	for _, up := range ups {
		addr := gresolver.Address{
			Addr: up.Endpoint.Addr,
		}
		addrs = append(addrs, addr)
	}
	return addrs
}

// ResolveNow is a no-op here.
// It's just a hint, resolver can ignore this if it's not necessary.
func (r *resolver) ResolveNow(gresolver.ResolveNowOptions) {}

// Close cancel后协程退出，etcd watch资源释放。client conn关闭时会被调用
func (r *resolver) Close() {
	r.cancel()
	//必修等待下，否则可能协程退出了资源还没来得及释放
	r.wg.Wait()
}

type DirectBuilder struct{}

func (d *DirectBuilder) Build(target gresolver.Target, cc gresolver.ClientConn, opts gresolver.BuildOptions) (gresolver.Resolver, error) {
	auth := target.URL.Host
	if len(auth) < 1 {
		return nil, status.Error(codes.InvalidArgument, "resolver: invalid endpoint hosts is empty")
	}
	hosts, err := authorityToEndpoints(target.URL.Host)
	if nil != err {
		return nil, status.Errorf(codes.InvalidArgument, "resolver: fail to parse endpoint hosts %s", err)
	}
	addrs := make([]gresolver.Address, 0, len(hosts))

	for _, val := range hosts {
		addrs = append(addrs, gresolver.Address{
			Addr: val,
		})
	}
	if err := cc.UpdateState(gresolver.State{
		Addresses: addrs,
	}); err != nil {
		return nil, err
	}

	return &nopResolver{cc: cc}, nil
}

func (d *DirectBuilder) Scheme() string {
	return "direct"
}

type nopResolver struct {
	cc gresolver.ClientConn
}

func (r *nopResolver) Close() {
}

func (r *nopResolver) ResolveNow(options gresolver.ResolveNowOptions) {
}

// BuildDirectTarget returns a string that represents the given endpoints with direct schema.
func BuildDirectTarget(endpoints []string) string {
	return fmt.Sprintf("%s:///%s", "direct",
		endpointsToAuthorityTo(endpoints))
}

// BuildDiscovTarget returns a string that represents the given endpoints with discov schema.
func BuildDiscovTarget(endpoints []string, key string) string {
	return fmt.Sprintf("%s://%s/%s", "etcd",
		endpointsToAuthorityTo(endpoints), key)
}
